package com.yaukie.socket.server;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yaukie.socket.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @功能名称： 构造一个websocket服务器,并指定频道
 * @功能描述： 构造一个websocket服务器，并指定频道
 * @作者： yuenbin
 * @创建时间： 10:00 2023/7/22
 * @Motto： It is better to be clear than to be clever !
 **/
@Component
@ServerEndpoint("/websocket/{topicId}")
public class WebSocketServer  implements BeanFactoryPostProcessor  {

    private static final long sessionTimeout = 600000;

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     *  用于缓存websocket服务器
     */
    private static Map<String,WebSocketServer> WEB_SOCKET_CACHE = new ConcurrentHashMap<>() ;

    private Session session ;


    private String topicId ;

    private static  AtomicInteger WEB_SOCKET_LINE = new AtomicInteger(0) ;


    private static   ConfigurableListableBeanFactory  configurableListableBeanFactory ;

    /**
     * @Description： 建立连接的时候回调方法，统计订阅频道的websocket在线服务器
     * @params：
     * @return :
     * @author : yuenbin
     * @date： 10:05 2023/7/22
     * @Motto： It is better to be clear than to be clever !
     **/
    @OnOpen
    public void onOpen(Session session,@PathParam("topicId") String topicId ){
         session.setMaxIdleTimeout(99999);
         this.session = session ;
         this.topicId= topicId ;
        if(WEB_SOCKET_CACHE.containsKey(topicId)){
            WEB_SOCKET_CACHE.remove(topicId) ;
            // 更新当前服务器消息内容
             WEB_SOCKET_CACHE.put(topicId,this);
         }else {
            WEB_SOCKET_CACHE.put(topicId,this);
            WEB_SOCKET_LINE.getAndIncrement() ;
        }
        try {
            this.sendMsg("服务器-"+ InetAddress.getLocalHost().getCanonicalHostName()+""+"-连接成功！");
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }

    }

    @OnClose
    public void onClose(){
        if(WEB_SOCKET_CACHE.containsKey(topicId)){
            WEB_SOCKET_CACHE.remove(topicId) ;
            WEB_SOCKET_LINE.getAndDecrement() ;
            try {
                log.info("服务器-{}-退出连接！",InetAddress.getLocalHost().getCanonicalHostName(),topicId);
                log.info("当前服务器消息主题在线个数为-{}！",WEB_SOCKET_LINE.get());
            } catch (Exception e) {
                log.info("当前服务器退出，原因为-{}！", e.getMessage());
            }

        }
    }

    /**
     * @Description： 收到客户端消息，并将消息发送出去
     * @params：
     * @return :
     * @author : yuenbin
     * @date： 10:44 2023/7/24
     * @Motto： It is better to be clear than to be clever !
     **/
    @OnMessage
    public void  onMessage(String message ,Session session ){
        JSONObject jsonObject = JSON.parseObject(message) ;
        this.sendMessageToChannel(jsonObject.getString(Constants.SOCKET_REDIS_KEY),jsonObject.getString(Constants.SOCKET_REDIS_VAL));
        log.info("发送主题为-{}-的消息，消息报文为：{}",topicId,message,null,null,null);
    }

    @OnError
    public void onError (Session session,Throwable e ){
        log.error("消息主题为:" + this.topicId + "出现发送异常,原因:" + e.getMessage());
        e.printStackTrace();
    }

    /**
     * @Description： 发布消息，需要指定redis频道
     * @params：
     * @return :
     * @author : yuenbin
     * @date： 10:50 2023/7/22
     * @Motto： It is better to be clear than to be clever !
     **/
    public void  sendMessageToChannel(String topicId,String msg ){
        String message = null ;
        try {
         message = new String(msg.getBytes("UTF-8"),"UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        Map<String ,String> msgObj = new ConcurrentHashMap<>() ;
        msgObj.put(Constants.SOCKET_REDIS_KEY,topicId);
        msgObj.put(Constants.SOCKET_REDIS_VAL,message);
        StringRedisTemplate template =  WebSocketServer.configurableListableBeanFactory.getBean(StringRedisTemplate.class);
        template.convertAndSend(Constants.REDIS_CHANNEL_ID, JSON.toJSONString(msgObj));
    }

    /**
     * @Description： 根据订阅频道发送消息，必须指定频道ID
     * @params：
     * @return :
     * @author : yuenbin
     * @date： 11:37 2023/7/22
     * @Motto： It is better to be clear than to be clever !
     **/
public void sendMsgByTopicId( String topicId,String msg ){
    WebSocketServer webSocketServer = null ;
    if(WEB_SOCKET_CACHE.containsKey(topicId)){
        webSocketServer = WEB_SOCKET_CACHE.get(topicId) ;
        if(!StringUtils.isEmpty(webSocketServer) ){
            webSocketServer.sendMsg(msg);
            log.error("消息由服务器-{}-发出",topicId);
        }
    }else {
        WEB_SOCKET_CACHE.put(topicId,this);
        WEB_SOCKET_LINE.getAndIncrement();
     }

}

/**
 * @Description： 使用websocket发送消息
 * @params：
 * @return :
 * @author : yuenbin
 * @date： 11:41 2023/7/22
 * @Motto： It is better to be clear than to be clever !
 **/
public void sendMsg(String msg ){

    try {
        this.session.getBasicRemote().sendText(msg,true);
    } catch (IOException e) {
        e.printStackTrace();
    }

}


    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        WebSocketServer.configurableListableBeanFactory = configurableListableBeanFactory ;
    }
}
